import numpy as np
import cv2
import glob
import matplotlib.pyplot as plt
# Get calibratio coeficients of camera
def getCalibration(pathToImages, kernel=(9, 6), plot=0):
objpoints = []
imgpoints = []
nx = kernel[0]
ny = kernel[1]
objp = np.zeros((nx * ny, 3), np.float32)
objp[:,:2] = np.mgrid[0:nx, 0:ny].T.reshape(-1,2)
images = glob.glob(pathToImages + '*.jpg')
for idx, fname in enumerate(images):
image = cv2.imread(fname)
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
ret, corners = cv2.findChessboardCorners(gray, (nx, ny), None)
if ret == True:
imgpoints.append(corners)
objpoints.append(objp)
img = cv2.drawChessboardCorners(image, (nx, ny), corners, ret)
if plot:
f, (p1) = plt.subplots(1,1, figsize=(5,5))
p1.set_title('Original: ' + fname)
p1.imshow(img)
p1.axis('off')
plt.show()
ret, mtx, dist, rvecs, tvecs = cv2.calibrateCamera( objpoints,
imgpoints,
gray.shape[::-1],
None,
None)
return mtx, dist
# Get calibration for camera
mtx, dist = getCalibration('camera_cal/calibration', (9, 6), 0);
# Plot undistorted images with calibration data
def plotUndistorted(pathToImages, mtx, dist):
images = glob.glob(pathToImages + '*.jpg')
nimgs = len(images)
for idx, fname in enumerate(images):
f, (p1, p2) = plt.subplots(1,2, figsize=(10,5))
image = cv2.imread(fname)
undist = cv2.undistort(image, mtx, dist, None, mtx)
undist = cv2.cvtColor(undist, cv2.COLOR_BGR2RGB)
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
p1.set_title('Original: ' + fname)
p1.imshow(image)
p1.axis('off')
p2.set_title('Undistorted: ' + fname)
p2.imshow(undist)
p2.axis('off')
plt.show()
plotUndistorted('camera_cal/calibration', mtx, dist)
# Extract grids in images, to visualize quality of calibration
def getGridAreas(pathToImages, mtx, dist, kernel=(9, 6)):
nx = kernel[0]
ny = kernel[1]
images = glob.glob(pathToImages + '*.jpg')
for idx, fname in enumerate(images):
image = cv2.imread(fname)
undist = cv2.undistort(image, mtx, dist, None, mtx)
gray = cv2.cvtColor(undist, cv2.COLOR_BGR2GRAY)
ret, corners = cv2.findChessboardCorners(gray, (nx, ny), None)
if ret == True:
# Draw and display the corners
cv2.drawChessboardCorners(undist, (nx, ny), corners, ret)
offset = 100
img_size = (gray.shape[1], gray.shape[0])
src = np.float32([corners[0],
corners[nx-1],
corners[-1],
corners[-nx]])
dst = np.float32([[offset, offset],
[img_size[0]-offset, offset],
[img_size[0]-offset, img_size[1]-offset],
[offset, img_size[1]-offset]])
M = cv2.getPerspectiveTransform(src, dst)
warped = cv2.warpPerspective(undist, M, img_size)
f, (p1, p2) = plt.subplots(1,2, figsize=(10,5))
f.tight_layout()
image = cv2.imread(fname)
undist = cv2.undistort(image, mtx, dist, None, mtx)
undist = cv2.cvtColor(undist, cv2.COLOR_BGR2RGB)
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
p1.set_title('Original: ' + fname)
p1.imshow(image)
p1.axis('off')
p2.set_title('Grid: ' + fname)
p2.imshow(warped)
p2.axis('off')
plt.show()
getGridAreas('camera_cal/calibration', mtx, dist, (9,6))
# Define all used color spaces
def colorSpace(image, color):
if color == 'bgr_r':
return image[:,:,2]
if color == 'bgr_g':
return image[:,:,1]
if color == 'bgr_b':
return image[:,:,0]
if color == 'hls_s':
hls = cv2.cvtColor(image, cv2.COLOR_BGR2HLS)
return hls[:,:,2]
if color == 'hls_l':
hls = cv2.cvtColor(image, cv2.COLOR_BGR2HLS)
return hls[:,:,1]
if color == 'yuv_y':
yuv = cv2.cvtColor(image, cv2.COLOR_BGR2YUV)
return yuv[:,:,0]
if color == 'gray':
return cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
if color == 'lab_b':
lab = cv2.cvtColor(image, cv2.COLOR_BGR2Lab)
return lab[:,:,2]
if color == 'yuv_u':
yuv = cv2.cvtColor(image, cv2.COLOR_BGR2YUV)
return yuv[:,:,1]
if color == 'hls_h':
hls = cv2.cvtColor(image, cv2.COLOR_BGR2HLS)
return hls[:,:,0]
return None
# Combine sobel stuff in a single functio
def sobel_detect(img, sobel_kernel=3, dir_thresh=(0, np.pi/2, np.pi/6), abs_thresh=(10, 255), mag_thresh=(0, 255)):
sobelx = cv2.Sobel(img, cv2.CV_64F, 1, 0, ksize=sobel_kernel)
sobely = cv2.Sobel(img, cv2.CV_64F, 0, 1, ksize=sobel_kernel)
absgraddir = np.arctan2(np.absolute(sobely), np.absolute(sobelx))
mag_sobel_manhattan = np.uint8( 255 * ( np.abs(sobelx) + np.abs(sobely) ) / np.max( np.abs(sobelx) + np.abs(sobely) ) )
mag_sobel_euclidean = np.uint8( 255 * np.sqrt(sobelx**2 + sobely**2)/np.max(np.sqrt(sobelx**2 + sobely**2)))
binary_output = np.zeros_like(absgraddir)
# Do note that we compare angles
# Then magnitude of sobel vector as euclidean
# Then magnitude of sobel vector as manhattan
binary_output[ ( ( (absgraddir >= dir_thresh[0] - dir_thresh[2]) & (absgraddir <= dir_thresh[0] + dir_thresh[2]) ) |
( (absgraddir >= dir_thresh[1] - dir_thresh[2]) & (absgraddir <= dir_thresh[1] + dir_thresh[2]) ) ) &
( (mag_sobel_euclidean >= mag_thresh[0]) & (mag_sobel_euclidean <= mag_thresh[1]) ) &
( (mag_sobel_manhattan >= abs_thresh[0]) & (mag_sobel_manhattan <= abs_thresh[1]) ) ] = 1
return binary_output
# Select range of image data
def range_select(img, thresh=(0, 255)):
binary_output = np.zeros_like(img)
binary_output[(img >= thresh[0]) & (img <= thresh[1])] = 1
return binary_output
def normalize_channel(image_data):
image_data = np.array(image_data)
x_min = (image_data.min())
x_max = (image_data.max())
a = 0
b = 255
return np.uint8( a + ( b - a ) * np.true_divide( ( image_data - x_min ), ( x_max - x_min ) ) )
# Mild preprocessing of images, for now just normalization
def preprocess_image(image):
img = cv2.cvtColor(image, cv2.COLOR_BGR2YUV);
a,b,c = cv2.split(img)
# I hand tuned these myself
#clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(3,3))
#a = clahe.apply(a)
#b = 127 * np.ones_like(b)
#a = normalize_channel(a)
#abc = cv2.merge((a,b,c))
#rgb = cv2.cvtColor(abc, cv2.COLOR_YUV2BGR);
# normalize channels then
a = equ = cv2.equalizeHist(a)
abc = cv2.merge((a,b,c))
rgb = cv2.cvtColor(abc, cv2.COLOR_YUV2BGR);
rgb[:,:,0] = normalize_channel(rgb[:,:,0])
rgb[:,:,1] = normalize_channel(rgb[:,:,1])
rgb[:,:,2] = normalize_channel(rgb[:,:,2])
return rgb
# defines the pipeline that extracts lane points from camera
def pipeline(img, plot_masks=0):
# undistort image
image = cv2.undistort(img, mtx, dist, None, mtx)
#image = preprocess_image(image)
combined = np.zeros_like(image[:,:,0])
rangedr = (range_select(colorSpace(image, 'bgr_r'), thresh=(100, 255)))
sobelr = (sobel_detect(colorSpace(image, 'bgr_r'), sobel_kernel=5, dir_thresh=(0.78,2.35,np.pi/6), abs_thresh=(70,255), mag_thresh=(80,255)))
rangeds = (range_select(colorSpace(image, 'hls_s'), thresh=(70, 255)))
sobels = (sobel_detect(colorSpace(image, 'hls_s'), sobel_kernel=5, dir_thresh=(0.78,2.35,np.pi/6), abs_thresh=(50,255), mag_thresh=(50,255)))
rangedur = (range_select(colorSpace(image, 'bgr_r'), thresh=(220, 255)))
sobelur = (sobel_detect(colorSpace(image, 'bgr_r'), sobel_kernel=5, dir_thresh=(0.78,2.35,np.pi/5), abs_thresh=(5,255), mag_thresh=(5,255)))
combined = np.zeros_like(image[:,:,0])
combined[ ((rangedur >= 1) & (sobelur >= 1)) | ((rangedr >= 1) & (sobelr >= 1)) | ((rangeds >= 1) & (sobels >= 1)) ] = 1
# prune all points not red or green enough (includes yellow and white)
filterr = (range_select(colorSpace(image, 'bgr_r'), thresh=(100, 255)))
filterg = (range_select(colorSpace(image, 'bgr_g'), thresh=(100, 255)))
combined[ (filterr == 0) | (filterg == 0) ] = 0
if plot_masks:
maskr = np.zeros_like(image[:,:,0])
maskr[((rangedr >= 1) & (sobelr >= 1))] = 1
masks = np.zeros_like(image[:,:,0])
masks[((rangeds >= 1) & (sobels >= 1))] = 1
maskur = np.zeros_like(image[:,:,0])
maskur[((rangedur >= 1) & (sobelur >= 1))] = 1
maskf = np.zeros_like(image[:,:,0])
maskf[((filterr >= 1) & (filterg >= 1))] = 1
f, (p1, p2, p3, p4, p5, p6) = plt.subplots(1,6, figsize=(16,4))
f.tight_layout()
p1.imshow(cv2.cvtColor(image, cv2.COLOR_BGR2RGB))
p2.imshow(maskr, cmap = 'gray')
p3.imshow(masks, cmap = 'gray')
p4.imshow(maskur, cmap = 'gray')
p5.imshow(maskf, cmap = 'gray')
p6.imshow(combined, cmap = 'gray')
p1.axis('off')
p2.axis('off')
p3.axis('off')
p4.axis('off')
p5.axis('off')
p6.axis('off')
plt.show()
return combined
# Show pipeline behavior for images in test set
def appyPipeline(pathToImages, pipeline):
images = glob.glob(pathToImages + '*.jpg')
for idx, fname in enumerate(images):
image = cv2.imread(fname)
procimg = pipeline(image, 1)
appyPipeline('test_images/', pipeline)
# warp the image, also return M and Minv for later use
def transform(image, erode=1):
img_size = (image.shape[1], image.shape[0])
offset = 300
# I ontained these by empirical trials
tolerance = 0.04
src = np.float32([[img_size[0] * ( 0.5 - tolerance ), img_size[1] * 0.625],
[img_size[0] * ( 0.5 + tolerance ), img_size[1] * 0.625],
[img_size[0] * 0.9, img_size[1]],
[img_size[0] * 0.1, img_size[1]]])
dst = np.float32([[offset, 0],
[img_size[0] - offset, 0],
[img_size[0] - offset, img_size[1]],
[offset, img_size[1]]])
M = cv2.getPerspectiveTransform(src, dst)
Minv = cv2.getPerspectiveTransform(dst, src)
warped = cv2.warpPerspective(image, M, img_size)
# erosion helps filtering noise
# We also work inside a smaller image to icnrease processing speed
if erode:
kernel = cv2.getStructuringElement(cv2.MORPH_RECT,(3,3))
erosion = cv2.erode(warped,kernel,iterations = 1)
dilation = cv2.dilate(erosion, kernel, iterations = 1)
resized = cv2.resize(dilation, (0,0), fx=1/4, fy=8/4, interpolation=cv2.INTER_AREA )
else:
resized = cv2.resize(warped, (0,0), fx=1/4, fy=8/4, interpolation=cv2.INTER_AREA )
saturated = np.zeros_like(resized)
saturated[resized > 0] = 1
return saturated, M, Minv
from sklearn.cluster import DBSCAN
from sklearn.datasets.samples_generator import make_blobs
from sklearn.preprocessing import StandardScaler
# Obtain clusters of pixels in image section and return them
# along with how many points were classified into clusters, this
# helps abstract the points of the lane markers
def clusterizeSection(img, pxradius=20):
idxy, idxx = np.nonzero(img)
conv = [ [idxx[j], idxy[j]] for j in range(len(idxy)) ]
means = []
weights = []
if (len(conv)) > 0:
X = np.array(conv) #StandardScaler().fit_transform(conv)
db = DBSCAN(eps=pxradius, min_samples=10).fit(X)
core_samples_mask = np.zeros_like(db.labels_, dtype=bool)
core_samples_mask[db.core_sample_indices_] = True
labels = db.labels_
n_clusters_ = len(set(labels)) - (1 if -1 in labels else 0)
unique_labels = set(labels)
for k in unique_labels:
class_member_mask = (labels == k)
#xy = X[class_member_mask & core_samples_mask]
xy = X[class_member_mask]
if len(xy) > 0:
means.append(xy.mean(axis=0))
weights.append(len(xy))
else:
n_clusters_ = n_clusters_ - 1
if (n_clusters_ > 0) and (len(means) > 0):
return n_clusters_, means, weights
return 0, None, None
# slides through the image getting the points of the lane markers for
# easier fitting (ie, instead of using the image, use a few points of it)
def getPointBlobs(warped, slices=10, pxradius=20, scalef=1):
all_means = []
all_weights = []
for i in range(slices * 2 - 1):
step = warped.shape[0]//slices
end = warped.shape[0] - i * (step//2)
start = end - step
section = warped[start:end,:]
histogram = np.sum(section[:,:], axis=0)
n_clusters, means, weights = clusterizeSection(section, pxradius)
if n_clusters > 0:
traslated_means = []
for mean in means:
traslated_means.append([scalef * mean[0], scalef * ( warped.shape[0] - ( start + mean[1] ))])
all_means.append(traslated_means)
all_weights.append(weights)
return all_means, all_weights
# first clusterization method, points are classified by
# euclidian distance
def clusterizeLanesAll(means, weights, pxtolerance=40, scalef=1):
# clusterize upwards
labels = []
labels_w = []
# point labels should have len of points
points_labels = []
points_values = []
flat_means = [mean for elements in means for mean in elements]
flat_means = np.array(flat_means)
flat_means = flat_means[flat_means[:,1].argsort()]
for mean in flat_means:
# look for correspondance, if not, add as new label
meanx = mean[0]
meany = mean[1]
label = -1
# look for min distance in X to topmost point
if len(labels) > 0:
alls = np.array(points_values)
labs = np.array(points_labels)
allsx = alls[:,0]
allsy = alls[:,1]
allsx = allsx - meanx
allsy = allsy - meany
# Get x points in range
allsxabs = np.abs(allsx)
candidatespx = allsx[allsxabs < pxtolerance * scalef]
candidatespy = allsy[allsxabs < pxtolerance * scalef]
candidatesl = labs[allsxabs < pxtolerance * scalef]
# Get topmost points for each one
if len(candidatesl) > 0:
# Get closest point in candidates, increase x distance thogh by
# some empirical amount, to favor closeness in Y
allsxy = np.sqrt((8 * candidatespx)**2 + (1 * candidatespy)**2)
idx = np.argmin(allsxy)
# Check if there is closer point of label in Y
# that breaks tolerance
label = candidatesl[idx]
# if no match is found assume new label
if label == -1:
label = len(labels)
labels.append(label)
labels_w.append(0)
# assign label to point
points_labels.append(label)
points_values.append([meanx, meany])
labels_w[label] = labels_w[label] + 1
labels = np.array(labels)
points_labels = np.array(points_labels)
points_values = np.array(points_values)
labels_w = np.array(labels_w)
return labels, points_labels, points_values, labels_w
# as before, but using a designated 'TOP' point that is
# updated upwards
def clusterizeLanesTop(means, weights, pxtolerance=40, scalef=1):
# clusterize upwards
labels = []
topmost = []
labels_w = []
# point labels should have len of points
points_labels = []
points_values = []
flat_means = [mean for elements in means for mean in elements]
flat_means = np.array(flat_means)
flat_means = flat_means[flat_means[:,1].argsort()]
for mean in flat_means:
# look for correspondance, if not, add as new label
meanx = mean[0]
meany = mean[1]
label = -1
# look for min distance in X to topmost point
if len(topmost) > 0:
tops = np.array(topmost)
topsx = tops[:,0]
topsx = topsx - meanx
topsx = np.abs(topsx)
topsy = tops[:,1]
topsy = topsy - meany
topsy = np.abs(topsy)
# get mins
candidate_idx = topsx.argsort()[:]
topsx = np.sort(topsx)
topsy = topsy[candidate_idx]
# best in Y
candidate_idx = candidate_idx[np.abs(topsx) <= pxtolerance * scalef]
ctopsx = topsx[np.abs(topsx) <= pxtolerance * scalef]
ctopsy = topsy[np.abs(topsx) <= pxtolerance * scalef]
if len(candidate_idx) > 0:
# try euclid
ctopsxy = np.sqrt((8 * ctopsx)**2 + (1 * ctopsy)**2)
idy = np.argmin(ctopsxy)
idx = candidate_idx[idy]
label = idx
# if point is higher in Y update topmost
if meany > topmost[idx][1]:
topmost[idx][0] = meanx
topmost[idx][1] = meany
# if no match is found assume new label
if label == -1:
label = len(labels)
labels.append(label)
labels_w.append(0)
topmost.append(mean)
# assign label to point
points_labels.append(label)
points_values.append([meanx, meany])
labels_w[label] = labels_w[label] + 1
labels = np.array(labels)
points_labels = np.array(points_labels)
points_values = np.array(points_values[:])
labels_w = np.array(labels_w)
return labels, points_labels, points_values, labels_w
# wrapper of previos functions
def getLaneClusters(warped, order=5, closetol=20, longtol=30, scalef=0.025, plot=0, top=1):
means,weights = getPointBlobs(warped, order, closetol, scalef);
labels = []
points_labels = []
points_values = []
labels_w = []
if len(means) > 0:
if top:
labels, points_labels, points_values, labels_w = clusterizeLanesTop(means, weights, longtol, scalef);
else:
labels, points_labels, points_values, labels_w = clusterizeLanesAll(means, weights, longtol, scalef);
if plot:
unique_labels = set(labels)
colors = plt.cm.Spectral(np.linspace(0, 1, len(unique_labels)))
for k, col in zip(unique_labels, colors):
if k == -1:
# Black used for noise.
col = 'k'
class_member_mask = (points_labels == k)
xy = points_values[class_member_mask]
plt.plot(xy[:, 0], xy[:, 1], 'o', markerfacecolor=col,
markeredgecolor='k', markersize=14)
plt.show()
return labels, points_labels, points_values, labels_w
image = cv2.imread('test_images/test4.jpg')
plt.imshow(cv2.cvtColor(image, cv2.COLOR_BGR2RGB))
plt.show()
procimg = pipeline(image)
warped, M, Minv = transform(procimg, erode=1)
plt.figure(figsize=(6,6))
plt.imshow(warped, cmap='gray', aspect='auto')
plt.show()
# compare clusterizations algorithms
plt.figure(figsize=(6,6))
l,pl,ps,lw = getLaneClusters(warped, 10, 20, 40, 0.025, plot=1, top=1)
plt.figure(figsize=(6,6))
l,pl,ps,lw = getLaneClusters(warped, 10, 20, 40, 0.025, plot=1, top=0)
# Calculate and filter the fits (at least 8 points) and plot them all
# also calculates curvature, these are 'low' quality, and will be
# improved on
def getFits(labels, points_labels, points_values, labels_weights, plot=0):
colors = plt.cm.Spectral(np.linspace(0, 1, len(labels)))
# Apply fit to found points (more than 8!)
fits = []
curvs = []
labs = []
for k in labels:
if (labels_weights[k]) > 8:
points = points_values[points_labels == k]
xvals = points[:,0]
yvals = points[:,1]
p_fit = np.polyfit(yvals, xvals, 2)
y_eval = np.min(yvals)
p_curverad = ((1 + (2*p_fit[0]*y_eval + p_fit[1])**2)**1.5)/np.absolute(2*p_fit[0])
fits.append(p_fit)
curvs.append(p_curverad)
labs.append(k)
if plot:
p_fitx = p_fit[0]*yvals**2 + p_fit[1]*yvals + p_fit[2]
plt.plot(p_fitx, yvals, 'o', color=colors[k], linewidth=3)
if plot:
plt.show()
return labs, fits, curvs
# Test our clustering algos
image = cv2.imread('test_images/test4.jpg')
procimg = pipeline(image)
warped, M, Minv = transform(procimg)
plt.imshow(cv2.cvtColor(image, cv2.COLOR_BGR2RGB))
plt.show()
plt.imshow(warped, cmap='gray', aspect='auto')
plt.show()
labels,points_labels,points_values,labels_weights = getLaneClusters(warped, 10, 20, 40, 0.025, plot=1, top=1)
labels,fits,curvs = getFits(labels, points_labels, points_values, labels_weights, plot=1)
# extract left and right fits withi possible range of car, also
# plot found fits on sides
def getSidedFits(image, fits, curvs, tol=2*3.7, scalef=0.025, plot=0):
# classify fits as feasible or not
left_fits = []
right_fits = []
curvs_left = []
curvs_right = []
if len(fits) > 0:
zerox = 0.5 * scalef * image.shape[1] / 4
fit0x = []
for k in range(len(fits)):
fit = fits[k]
fit0x.append(fit[2])
# find closest fits to car
fit0xnp = np.array(fit0x)
caroffset = fit0xnp - zerox
caroffset = np.abs(caroffset)
minidx = caroffset.argsort()[:]
for k in range(len(fits)):
idx = minidx[k]
if caroffset[idx] < tol:
if fits[idx][2] > zerox:
right_fits.append(fits[idx])
curvs_right.append(curvs[idx])
if fits[idx][2] < zerox:
left_fits.append(fits[idx])
curvs_left.append(curvs[idx])
if plot:
for l_fit in right_fits:
l_fity = np.arange(0, 8 * scalef * image.shape[0] / 4, 1)
l_fitx = l_fit[0]*l_fity**2 + l_fit[1]*l_fity + l_fit[2]
plt.plot(l_fitx, l_fity, 'o')
for l_fit in left_fits:
l_fity = np.arange(0, 8 * scalef * image.shape[0] / 4, 1)
l_fitx = l_fit[0]*l_fity**2 + l_fit[1]*l_fity + l_fit[2]
plt.plot(l_fitx, l_fity, 'x')
plt.show()
return left_fits, right_fits, curvs_left, curvs_right
# Use found fits as basis for EM algo, using the warped image and the fits
# to improve each fit iteratively
def optimFitEM(warped, fits, x_tol=0.5, scalef=0.025, plot=1):
# Get ALL points of image
idxy, idxx = np.nonzero(warped)
# See if they are within fit and use for optim
xvals = scalef * idxx[:]
yvals = scalef * ( warped.shape[0] - idxy[:] )
improved_fits = []
curvs = []
for fit in fits:
# 3 iterations seems enought though
sfit = fit
valid = 0
for i in range(4):
# Get points within range of fit
# p_fitx = p_fit[0]*yvals**2 + p_fit[1]*yvals + p_fit[2]
xfit = [ xvals[j] for j in range(len(xvals)) if (abs(sfit[0]*yvals[j]**2 + sfit[1]*yvals[j] + sfit[2] - xvals[j]) < x_tol) ]
yfit = [ yvals[j] for j in range(len(xvals)) if (abs(sfit[0]*yvals[j]**2 + sfit[1]*yvals[j] + sfit[2] - xvals[j]) < x_tol) ]
if len(xfit) > 10:
valid = 1
pxvals = np.array(xfit)
pyvals = np.array(yfit)
sfit = np.polyfit(pyvals, pxvals, 2)
else:
valid = 0
if valid:
y_eval = np.min(pyvals)
curv = ((1 + (2*sfit[0]*y_eval + sfit[1])**2)**1.5)/np.absolute(2*sfit[0])
improved_fits.append(sfit)
curvs.append(curv)
if plot:
print(improved_fits)
print(curvs)
for l_fit in improved_fits:
l_fity = np.arange(0, 8 * scalef * image.shape[0] / 4, 1)
l_fitx = l_fit[0]*l_fity**2 + l_fit[1]*l_fity + l_fit[2]
plt.plot(l_fitx, l_fity, 'o')
plt.show()
return improved_fits, curvs
# test improvement process
image = cv2.imread('test_images/test4.jpg')
plt.imshow(cv2.cvtColor(image, cv2.COLOR_BGR2RGB))
plt.show()
procimg = pipeline(image)
plt.imshow(procimg, cmap='gray', aspect='auto')
plt.show()
warped,_,_ = transform(procimg)
plt.imshow(warped, cmap='gray', aspect='auto')
plt.show()
labels,points_labels,points_values,labels_weights = getLaneClusters(warped, 10, 20, 40, 0.025, plot=1, top=1)
labels,fits,curvs = getFits(labels, points_labels, points_values, labels_weights, plot=0)
fits,curvs = optimFitEM(warped, fits, x_tol=0.6, scalef=0.025, plot=1);
# test side fits classification
left_fits, right_fits, curvs_left, curvs_right = getSidedFits(image, fits, curvs, tol=3.7, scalef=0.025, plot=1)
print(curvs_left)
print(curvs_right)
# defines our vides pipeline, also creates green masl and applies it
# global state variables, for help
global last_best_l_fits
global last_best_r_fits
global last_best_l_curv
global last_best_r_curv
global using_left
global using_right
global gcleft
global gcright
last_best_l_fits = [0,0,0]
last_best_r_fits = [0,0,0]
last_best_l_curv = 0
last_best_r_curv = 0
using_left = 0
using_right = 0
gcleft = 0
gcright = 0
def imposeLanes(image, plot=0):
procimg = pipeline(image)
image = cv2.undistort(image, mtx, dist, None, mtx)
warped,M,Minv = transform(procimg)
# ugly error handling code
valid = 0
labels,points_labels,points_values,labels_weights = getLaneClusters(warped, 10, 20, 40, 0.025, plot=0, top=1)
if len(labels) > 0:
valid = 1
labels,fits,curvs = getFits(labels, points_labels, points_values, labels_weights, plot=0)
else:
labels = []
fits = []
curvs = []
if len(labels) > 0:
valid = 1
else:
valid = 0
# add previous best fit to fits to improve detection too, but only if we lost the lane marker
if using_left or not valid:
fits.append(np.array(last_best_l_fits))
if using_right or not valid:
fits.append(np.array(last_best_r_fits))
# optimize fits
fits,curvs = optimFitEM(warped, fits, x_tol=0.6, scalef=0.025, plot=0);
# classify sides
left_fits, right_fits, curvs_left, curvs_right = getSidedFits(image, fits, curvs, tol=3, scalef=0.025, plot=0)
warp_zero = np.zeros_like(warped).astype(np.uint8)
color_warp = np.dstack((warp_zero, warp_zero, warp_zero))
scalef = 0.025
global last_best_l_fits
global last_best_r_fits
global last_best_l_curv
global last_best_r_curv
global using_left
global using_right
global gcleft
global gcright
if len(left_fits) > 0:
using_left = 0
l_curv = curvs_left[0]
l_fit = left_fits[0]
last_best_l_fits[0] = l_fit[0]
last_best_l_fits[1] = l_fit[1]
last_best_l_fits[2] = l_fit[2]
last_best_l_curv = curvs_left[0]
else:
using_left = 1
l_fit = np.array([last_best_l_fits[0], last_best_l_fits[1], last_best_l_fits[2]])
l_curv = last_best_l_curv
if len(right_fits) > 0:
using_right = 0
r_fit = right_fits[0]
r_curv = curvs_right[0]
last_best_r_fits[0] = r_fit[0]
last_best_r_fits[1] = r_fit[1]
last_best_r_fits[2] = r_fit[2]
last_best_r_curv = curvs_right[0]
else:
using_right = 1
r_fit = np.array([last_best_r_fits[0], last_best_r_fits[1], last_best_r_fits[2]])
r_curv = last_best_r_curv
if gcleft == 0:
gcleft = l_curv
else:
gcleft = 0.25 * (l_curv) + 0.75 * (gcleft)
if gcright == 0:
gcright = r_curv
else:
gcright = 0.25 * (r_curv) + 0.75 * (gcright)
# saturation
if gcright > 1/0.0001:
gcright = 1/0.0001
if gcleft > 1/0.0001:
gcleft = 1/0.0001
left_offset = 4 - l_fit[2]
right_offset = r_fit[2] - 4
#l_fit = left_fits[0]
l_fity = (1) * np.arange(0, 8 * scalef * image.shape[0] / 4, 1)
l_fitx = l_fit[0]*l_fity**2 + l_fit[1]*l_fity + l_fit[2]
l_fity = (8 / (8 * scalef)) * l_fity
l_fitx = (8 / (8 * scalef)) * l_fitx
#r_fit = right_fits[0]
r_fity = (1) * np.arange(0, 8 * scalef * image.shape[0] / 4, 1)
r_fitx = r_fit[0]*r_fity**2 + r_fit[1]*r_fity + r_fit[2]
r_fity = (8 / (8 * scalef)) * r_fity
r_fitx = (8 / (8 * scalef)) * r_fitx
l_fity=2*image.shape[0]-l_fity
r_fity=2*image.shape[0]-r_fity
pts_left = np.array([np.transpose(np.vstack([l_fitx, l_fity]))])
pts_right = np.array([np.flipud(np.transpose(np.vstack([r_fitx, r_fity])))])
pts = np.hstack((pts_left, pts_right))
cv2.fillPoly(color_warp, np.int_([pts]), (0,255, 0))
color_warp = cv2.resize(color_warp, (0,0), fx=4, fy=1/2, interpolation=cv2.INTER_AREA )
newwarp = cv2.warpPerspective(color_warp, Minv, (image.shape[1], image.shape[0]))
result = cv2.addWeighted(image, 1, newwarp, 0.3, 0)
cv2.putText(result,
'Curvature radius: ' + str( 0.5 * (gcleft + gcright) ) + ' m (average)',
(50, 100),
cv2.FONT_HERSHEY_SIMPLEX,
1,
(255, 255, 255),
2)
cv2.putText(result,
'Lateral offset: ' + str( right_offset - left_offset ) + ' m (r_dist - l_dist)',
(50, 150),
cv2.FONT_HERSHEY_SIMPLEX,
1,
(255, 255, 255),
2)
if plot:
plt.imshow(cv2.cvtColor(result, cv2.COLOR_BGR2RGB))
plt.show()
return result
# test video pipeline on single image
plt.imshow(cv2.cvtColor(image, cv2.COLOR_BGR2RGB))
plt.show()
cv2.cvtColor(imposeLanes(image, 1), cv2.COLOR_BGR2RGB);
# This wont work on the smaller images though, due to pixel size parameters
# test pipeline on all test images
def appyPipeline(pathToImages, pipeline):
images = glob.glob(pathToImages + '*.jpg')
for idx, fname in enumerate(images):
image = cv2.imread(fname)
procimg = pipeline(image, 1)
appyPipeline('test_images/', imposeLanes)
# apply pipeline to project video
# BEWARE, it will be slow!
global last_best_l_fits
global last_best_r_fits
global last_best_l_curv
global last_best_r_curv
global using_left
global using_right
global gcleft
global gcright
last_best_l_fits = [0,0,0]
last_best_r_fits = [0,0,0]
last_best_l_curv = 0
last_best_r_curv = 0
using_left = 0
using_right = 0
gcleft = 0
gcright = 0
from moviepy.editor import VideoFileClip
import matplotlib.image as mpimg
m_output = 'output.mp4'
clip1 = VideoFileClip("project_video.mp4")
m_clip = clip1.fl_image(imposeLanes) #NOTE: this function expects color images!!
%time m_clip.write_videofile(m_output, audio=False)